#ifndef __GLOBAL_CACHED_PRIVATE_H__
#define __GLOBAL_CACHED_PRIVATE_H__

/*
 * Copyright 2014 Open Connectome Project (http://openconnecto.me)
 * Written by Da Zheng (zhengda1936@gmail.com)
 *
 * This file is part of SAFSlib.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <atomic>

#include "io_interface.h"
#include "cache.h"
#include "container.h"
#include "comp_io_scheduler.h"

namespace safs
{

class request_allocator;
class req_ext_allocator;
class original_io_request;
class global_cached_io;

typedef std::pair<thread_safe_page *, original_io_request *> page_req_pair;

class global_cached_io: public io_interface
{
	/**
	 * This class represents a request that is being processed by global_cached_io.
	 */
	class partial_request
	{
		io_request req;
		off_t begin_pg_offset;
		off_t end_pg_offset;
		off_t curr_pg_offset;
		original_io_request *orig;
	public:
		partial_request() {
			begin_pg_offset = 0;
			end_pg_offset = 0;
			curr_pg_offset = 0;
			orig = NULL;
		}

		void init(const io_request &req) {
			this->req = req;
			begin_pg_offset = ROUND_PAGE(req.get_offset());
			end_pg_offset = ROUNDUP_PAGE(req.get_offset() + req.get_size());
			curr_pg_offset = begin_pg_offset;
			orig = NULL;
		}

		bool is_empty() const {
			return curr_pg_offset == end_pg_offset;
		}

		page_id_t get_curr_page_id() const {
			page_id_t pg_id(req.get_file_id(), curr_pg_offset);
			return pg_id;
		}

		void move_next() {
			curr_pg_offset += PAGE_SIZE;
		}

		const io_request &get_request() const {
			return req;
		}

		void init_orig(original_io_request *orig, io_interface *io);

		original_io_request *get_orig() const {
			return orig;
		}

		size_t get_remaining_size() const {
			off_t curr = curr_pg_offset;
			if (curr < req.get_offset())
				curr = req.get_offset();
			return req.get_offset() + req.get_size() - curr;
		}
	};

	long cache_size;
	page_cache::ptr global_cache;
	/* the underlying IO. */
	io_interface::ptr underlying;
	callback::ptr cb;

	std::unique_ptr<request_allocator> req_allocator;
	std::unique_ptr<req_ext_allocator> ext_allocator;
	std::unique_ptr<byte_array_allocator> orig_array_allocator;
	std::unique_ptr<byte_array_allocator> simp_array_allocator;

	// This contains the original requests issued by the application.
	// An original request is placed in this queue when the I/O on a page
	// covered by the request is complete.
	// We need this queue because there is no guarantee that the requests
	// queued on the page are from the same global_cached_io. Once the I/O
	// on the page completes, all requests on the page will be sent to the
	// global_cached_io where the requests were generated.
	thread_safe_FIFO_queue<page_req_pair> pending_requests;

	// It contains the completed asynchronous user requests.
	thread_safe_FIFO_queue<original_io_request *> complete_queue;
	// It contains the completed requests issued to the underlying IO by
	// global_cached_io.
	thread_safe_FIFO_queue<io_request> completed_disk_queue;

	// This contains the requests issued to the underlying IO.
	// There is only one thread that can access the request buffer,
	// it doesn't need to be thread-safe.
	std::vector<io_request> underlying_requests;

	// This contains the requests in the fast process path.
	fifo_queue<std::pair<io_request, thread_safe_page *> > cached_requests;
	// This contains the requests from the application.
	fifo_queue<io_request> user_requests;
	// This is a buffer of requests generated by user tasks.
	fifo_queue<io_request> user_comp_requests;
	// This contains a request from the application. It contains a request
	// in progress.
	partial_request processing_req;
	comp_io_scheduler::ptr comp_io_sched;

	size_t num_pg_accesses;
	size_t num_bytes;		// The number of accessed bytes
	size_t cache_hits;
	size_t num_fast_process;
	size_t num_evicted_dirty_pages;

	// Count the number of async requests.
	// The number of async requests that have been completed.
	atomic_number<size_t> num_completed_areqs;
	// The number of async requests issued by the application.
	atomic_number<size_t> num_issued_areqs;
	// The number of async requests that have been processed.
	// It is useful when we want to count the number of requests being
	// processed.
	atomic_number<size_t> num_processed_areqs;

	atomic_number<size_t> num_to_underlying;
	atomic_number<size_t> num_from_underlying;
	atomic_number<size_t> num_underlying_pages;

	/**
	 * It's another version of read() and write(), but it's responsible
	 * for deleting `req'.
	 */
	ssize_t __read(original_io_request *req, thread_safe_page *p);
	ssize_t __write(original_io_request *orig, thread_safe_page *p,
		std::vector<thread_safe_page *> &dirty_pages);
	int multibuf_completion(io_request *request);

	void wait4req(original_io_request *req);

	int get_num_underlying_reqs() const {
		return num_to_underlying.get() - num_from_underlying.get();
	}

	void send2underlying(io_request &req) {
		if (params.is_merge_reqs()) {
			underlying_requests.push_back(req);
		}
		else {
			io_status status;
			num_to_underlying.inc(1);
			num_underlying_pages.inc(req.get_num_bufs());
			underlying->access(&req, 1, &status);
			if (status == IO_FAIL)
				throw io_exception("fail to issue an I/O request");
		}
	}

	page_cache &get_global_cache() {
		return *global_cache;
	}
public:
	global_cached_io(thread *t, io_interface::ptr, page_cache::ptr cache,
			comp_io_scheduler::ptr sched = NULL);

	~global_cached_io();

	int get_block_size() const {
		// The I/O issued by cached I/O is broken into pages. As a result,
		// an I/O access reads data to multiple pages. Linux AIO doesn't
		// support an I/O request with many buffers. In other words, the I/O
		// request issued by cached I/O can't access very large data. So we
		// have to set a hard maximal number of pages in an I/O request.
		return std::min(io_interface::get_block_size(), 1024);
	}

	int preload(off_t start, long size);
	io_status access(char *buf, off_t offset, ssize_t size, int access_method);
	/**
	 * A request can access data of arbitrary size and from arbitrary offset.
	 */
	void access(io_request *requests, int num, io_status *status = NULL);
	virtual void flush_requests();

	/**
	 * One read can access multiple pages while one write can only write
	 * data in a page because if there is a large write (into multiple pages),
	 * the only IO it can cause is to read the first and the last pages if
	 * the offset of the write isn't aligned to a page size. Otherwise,
	 * we can just simply write data to a page without issuing any IO requests.
	 * TODO the only case that we can optimize writes is that a write needs
	 * to touch two pages and the beginning and the end of the write aren't
	 * aligned with a page size.
	 */
	ssize_t read(io_request &req, thread_safe_page *pages[], int npages,
			original_io_request *orig);

	// Finish processing cached I/O requests.
	void process_cached_reqs();
	// Process a request from the application.
	void process_user_req(std::vector<thread_safe_page *> &dirty_pages,
			io_status *status);
	// Process the remaining requests issued by the application.
	void process_user_reqs(queue_interface<io_request> &queue);

	void queue_requests(page_req_pair reqs[], int num) {
		BOOST_VERIFY(pending_requests.add(reqs, num) == num);
		get_thread()->activate();
	}

	int handle_pending_requests();

	virtual bool set_callback(callback::ptr cb) {
		if (underlying->support_aio())
			this->cb = cb;
		return underlying->support_aio();
	}

	virtual bool have_callback() const {
		return cb != NULL;
	}
	
	virtual callback &get_callback() {
		return *cb;
	}

	virtual bool support_aio() {
		return underlying->support_aio();
	}

	virtual int get_file_id() const {
		return underlying->get_file_id();
	}

	virtual void cleanup() {
		// wait4complete may generate more requests because of user compute
		// tasks. We have to make sure all requests are completed.
		while (num_pending_ios() > 0 || !comp_io_sched->is_empty())
			wait4complete(num_pending_ios());
		underlying->cleanup();
		assert(num_processed_areqs.get() == num_completed_areqs.get());
		assert(num_processed_areqs.get() == num_issued_areqs.get());
		assert(get_num_underlying_reqs() == 0);
		assert(num_underlying_pages.get() == 0);
		assert(pending_requests.is_empty());
		assert(complete_queue.is_empty());
		assert(completed_disk_queue.is_empty());
		assert(underlying_requests.empty());
		assert(cached_requests.is_empty());
		assert(user_requests.is_empty());
		assert(processing_req.is_empty());
	}

	virtual int wait4complete(int num);
	virtual void notify_completion(io_request *reqs[], int num);

	/**
	 * Process the completed requests issued to the disks.
	 * These requests may be part of the users' requests.
	 */
	void process_disk_completed_requests(io_request requests[], int num);
	/**
	 * Process all completed users' requests.
	 */
	int process_completed_requests();
	/**
	 * Process all queued requests.
	 */
	void process_all_requests();

	bool has_pending_requests() {
		return !pending_requests.is_empty();
	}

	virtual int num_pending_ios() const {
		assert(num_issued_areqs.get() >= num_completed_areqs.get());
		return num_issued_areqs.get() - num_completed_areqs.get();
	}

	void finalize_partial_request(io_request &partial, original_io_request *orig);
	void finalize_partial_request(thread_safe_page *p, original_io_request *orig);

	void write_dirty_page(thread_safe_page *p, const page_id_t &pg_id,
			original_io_request *orig);

	void wakeup_on_req(original_io_request *req, int status);

	size_t get_num_areqs() const {
		return num_issued_areqs.get();
	}

	size_t get_num_pg_accesses() const {
		return num_pg_accesses;
	}
	size_t get_num_bytes() const {
		return num_bytes;
	}
	size_t get_cache_hits() const {
		return cache_hits;
	}
	size_t get_num_fast_process() const {
		return num_fast_process;
	}

	virtual void print_state() {
#ifdef STATISTICS
		printf("global cached io %d has %d pending reqs and %ld reqs from underlying\n",
				get_io_id(), num_pending_ios(), num_from_underlying.get());
#endif
		printf("%d completed pending reqs, %d queued completed reqs from underlying\n",
				pending_requests.get_num_entries(), complete_queue.get_num_entries());
		underlying->print_state();
	}
};

}

#endif
